tutorials/014 - Schema Evolution.ipynb (482 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"[](https://github.com/aws/aws-sdk-pandas)\n",
"\n",
"# 14 - Schema Evolution\n",
"\n",
"awswrangler supports new **columns** on Parquet and CSV datasets through:\n",
"\n",
"- [wr.s3.to_parquet()](https://aws-sdk-pandas.readthedocs.io/en/3.11.0/stubs/awswrangler.s3.to_parquet.html#awswrangler.s3.to_parquet)\n",
"- [wr.s3.store_parquet_metadata()](https://aws-sdk-pandas.readthedocs.io/en/3.11.0/stubs/awswrangler.s3.store_parquet_metadata.html#awswrangler.s3.store_parquet_metadata) i.e. \"Crawler\"\n",
"- [wr.s3.to_csv()](https://aws-sdk-pandas.readthedocs.io/en/3.11.0/stubs/awswrangler.s3.to_csv.html#awswrangler.s3.to_csv)"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from datetime import date\n",
"\n",
"import pandas as pd\n",
"\n",
"import awswrangler as wr"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Enter your bucket name:"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" ···········································\n"
]
}
],
"source": [
"import getpass\n",
"\n",
"bucket = getpass.getpass()\n",
"path = f\"s3://{bucket}/dataset/\""
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Creating the Dataset\n",
"### Parquet Create"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>id</th>\n",
" <th>value</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>1</td>\n",
" <td>foo</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>2</td>\n",
" <td>boo</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" id value\n",
"0 1 foo\n",
"1 2 boo"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df = pd.DataFrame(\n",
" {\n",
" \"id\": [1, 2],\n",
" \"value\": [\"foo\", \"boo\"],\n",
" }\n",
")\n",
"\n",
"wr.s3.to_parquet(df=df, path=path, dataset=True, mode=\"overwrite\", database=\"aws_sdk_pandas\", table=\"my_table\")\n",
"\n",
"wr.s3.read_parquet(path, dataset=True)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"collapsed": false
},
"source": [
"### CSV Create"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"df = pd.DataFrame(\n",
" {\n",
" \"id\": [1, 2],\n",
" \"value\": [\"foo\", \"boo\"],\n",
" }\n",
")\n",
"\n",
"wr.s3.to_csv(df=df, path=path, dataset=True, mode=\"overwrite\", database=\"aws_sdk_pandas\", table=\"my_table\")\n",
"\n",
"wr.s3.read_csv(path, dataset=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Schema Version 0 on Glue Catalog (AWS Console)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
""
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Appending with NEW COLUMNS\n",
"### Parquet Append"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>id</th>\n",
" <th>value</th>\n",
" <th>date</th>\n",
" <th>flag</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>3</td>\n",
" <td>bar</td>\n",
" <td>2020-01-03</td>\n",
" <td>True</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>4</td>\n",
" <td>None</td>\n",
" <td>2020-01-04</td>\n",
" <td>False</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>1</td>\n",
" <td>foo</td>\n",
" <td>NaN</td>\n",
" <td>NaN</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>2</td>\n",
" <td>boo</td>\n",
" <td>NaN</td>\n",
" <td>NaN</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" id value date flag\n",
"0 3 bar 2020-01-03 True\n",
"1 4 None 2020-01-04 False\n",
"2 1 foo NaN NaN\n",
"3 2 boo NaN NaN"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df = pd.DataFrame(\n",
" {\"id\": [3, 4], \"value\": [\"bar\", None], \"date\": [date(2020, 1, 3), date(2020, 1, 4)], \"flag\": [True, False]}\n",
")\n",
"\n",
"wr.s3.to_parquet(\n",
" df=df,\n",
" path=path,\n",
" dataset=True,\n",
" mode=\"append\",\n",
" database=\"aws_sdk_pandas\",\n",
" table=\"my_table\",\n",
" catalog_versioning=True, # Optional\n",
")\n",
"\n",
"wr.s3.read_parquet(path, dataset=True, validate_schema=False)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"collapsed": false
},
"source": [
"### CSV Append\n",
"\n",
"Note: for CSV datasets due to [column ordering](https://docs.aws.amazon.com/athena/latest/ug/types-of-updates.html#updates-add-columns-beginning-middle-of-table), by default, schema evolution is disabled. Enable it by passing `schema_evolution=True` flag"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"df = pd.DataFrame(\n",
" {\"id\": [3, 4], \"value\": [\"bar\", None], \"date\": [date(2020, 1, 3), date(2020, 1, 4)], \"flag\": [True, False]}\n",
")\n",
"\n",
"wr.s3.to_csv(\n",
" df=df,\n",
" path=path,\n",
" dataset=True,\n",
" mode=\"append\",\n",
" database=\"aws_sdk_pandas\",\n",
" table=\"my_table\",\n",
" schema_evolution=True,\n",
" catalog_versioning=True, # Optional\n",
")\n",
"\n",
"wr.s3.read_csv(path, dataset=True, validate_schema=False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Schema Version 1 on Glue Catalog (AWS Console)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Reading from Athena"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>id</th>\n",
" <th>value</th>\n",
" <th>date</th>\n",
" <th>flag</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>3</td>\n",
" <td>bar</td>\n",
" <td>2020-01-03</td>\n",
" <td>True</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>4</td>\n",
" <td>None</td>\n",
" <td>2020-01-04</td>\n",
" <td>False</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>1</td>\n",
" <td>foo</td>\n",
" <td>None</td>\n",
" <td><NA></td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>2</td>\n",
" <td>boo</td>\n",
" <td>None</td>\n",
" <td><NA></td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" id value date flag\n",
"0 3 bar 2020-01-03 True\n",
"1 4 None 2020-01-04 False\n",
"2 1 foo None <NA>\n",
"3 2 boo None <NA>"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"wr.athena.read_sql_table(table=\"my_table\", database=\"aws_sdk_pandas\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Cleaning Up"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"True"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"wr.s3.delete_objects(path)\n",
"wr.catalog.delete_table_if_exists(table=\"my_table\", database=\"aws_sdk_pandas\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3.9.14",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.14"
}
},
"nbformat": 4,
"nbformat_minor": 4
}